package com.nstskj.study.netty.tcp.netty.protocol.executor.end.impl;

import cn.hutool.core.collection.BoundedPriorityQueue;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DatePattern;
import com.nstskj.study.netty.tcp.netty.config.properties.NettyNetDevTcpServerProperties;
import com.nstskj.study.netty.tcp.netty.config.properties.ThreadExecutionProperties;
import com.nstskj.study.netty.tcp.netty.protocol.endpoint.base.AbstractEndpoint;
import com.nstskj.study.netty.tcp.netty.protocol.endpoint.data.EndpointData;
import com.nstskj.study.netty.tcp.netty.protocol.endpoint.serialize.EndpointDataSerialize;
import com.nstskj.study.netty.tcp.netty.protocol.event.CloseNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.event.CreateNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.event.RegisterNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.event.base.AbstractNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.executor.AbstractWorkExecutor;
import com.nstskj.study.netty.tcp.netty.protocol.executor.end.DeviceEndpointDataWorkHandlerExecutor;
import com.nstskj.study.netty.tcp.netty.protocol.executor.work.AbstractTcpProtocolWork;
import com.nstskj.study.netty.tcp.netty.protocol.handler.end.DeviceEndpointHandler;
import com.nstskj.study.netty.tcp.netty.protocol.message.out.msg.base.AbstractOutputNetTcpMessage;
import com.nstskj.study.netty.tcp.netty.protocol.session.base.AbstractSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author ZhouChuGang
 * @version 1.0
 * @project nstskj-study-netty-tcp-spring
 * @date 2021/5/4 17:37
 * @Description
 */
@Slf4j
@Component
public class SpringDeviceEndpointDataWorkHandlerExecutorImpl extends AbstractWorkExecutor implements DeviceEndpointDataWorkHandlerExecutor, ApplicationListener<AbstractNetDeviceEvent> {

    @Autowired
    private DeviceEndpointHandler deviceEndpointHandler;

    @Autowired
    private EndpointDataSerialize endpointDataSerialize;

    /**
     * 每个会话的端点处理者
     * Map<sessionId,Map<address,work>>
     */
    private final Map<String, Map<Short, AbstractTcpProtocolWork<EndpointData>>> workExecutorMap = new ConcurrentHashMap<>();

    /**
     * 得到对应的线程池配置参数
     *
     * @param nettyNetDevTcpServerProperties
     * @return
     */
    @Override
    protected ThreadExecutionProperties getThreadExecutionProperties(NettyNetDevTcpServerProperties nettyNetDevTcpServerProperties) {
        return nettyNetDevTcpServerProperties.getDeviceEndpointDataWorkExecutor();
    }

    /**
     * 向指定的sessionId的指定端点地址发送数据
     *
     * @param sessionId  会话id
     * @param address    地址
     * @param endDataStr 端点数据
     * @return
     */
    @Override
    public boolean sendEndpointData(String sessionId, short address, String endDataStr) {
        Map<Short, AbstractTcpProtocolWork<EndpointData>> tcpProtocolWorkMap = this.workExecutorMap.get(sessionId);
        if (CollUtil.isNotEmpty(tcpProtocolWorkMap)) {
            AbstractTcpProtocolWork<EndpointData> tcpProtocolWork = tcpProtocolWorkMap.get(address);
            if (tcpProtocolWork != null) {
                EndpointData endpointData = new EndpointData();
                endpointData.setTimestamp(System.currentTimeMillis());
                endpointData.setTimestampStr(DatePattern.NORM_DATETIME_FORMAT.format(endpointData.getTimestamp()));
                endpointData.setAddress(address);
                endpointData.setHexStr(endDataStr);
                return tcpProtocolWork.getBlockingQueue().offer(endpointData);
            }
        }
        return false;
    }

    @Override
    public List<EndpointData> getEndpointDataList(String sessionId, short address) {
        Map<Short, AbstractTcpProtocolWork<EndpointData>> tcpProtocolWorkMap = this.workExecutorMap.get(sessionId);
        if (CollUtil.isNotEmpty(tcpProtocolWorkMap)) {
            AbstractTcpProtocolWork<EndpointData> tcpProtocolWork = tcpProtocolWorkMap.get(address);
            if (tcpProtocolWork != null) {
                return ((DeviceEndpointDataHandlerWork) tcpProtocolWork).getEndpointDataFifoList();
            }
        }
        return Collections.emptyList();
    }

    /**
     * 消息监听
     *
     * @param netDeviceEvent
     */
    @Override
    public void onApplicationEvent(AbstractNetDeviceEvent netDeviceEvent) {
        final AbstractSession abstractSession = netDeviceEvent.getAbstractSession();
        String sessionId = abstractSession.getSessionId();
        if (netDeviceEvent instanceof CreateNetDeviceEvent) {
            //如果为创建事件，则分配Map给其对象
            Map<Short, AbstractTcpProtocolWork<EndpointData>> map = new ConcurrentHashMap<>();
            this.workExecutorMap.put(sessionId, map);
        } else if (netDeviceEvent instanceof RegisterNetDeviceEvent) {
            //注册事件， 到了这里说明设备的端点已经创建了， 开始创建对应的线程
            Map<Short, AbstractEndpoint> endpointMap = abstractSession.getEndpointMap();
            Map<Short, AbstractTcpProtocolWork<EndpointData>> tcpProtocolWorkMap = this.workExecutorMap.get(sessionId);
            //如果为null，这个应该是不会出现的，如果出现直接关闭会话
            if (Objects.isNull(tcpProtocolWorkMap)) {
                abstractSession.close();
                return;
            }
            if (CollUtil.isNotEmpty(endpointMap)) {
                //得到线程池
                final ThreadPoolExecutor threadPoolExecutor = this.getThreadPoolExecutor();
                endpointMap.values().forEach(end -> {
                    //给每个端点开始创建工作线程
                    String workName = "end-".concat(sessionId).concat("-").concat(end.getAddress() + "");
                    DeviceEndpointDataHandlerWork endpointDataHandlerWork = new DeviceEndpointDataHandlerWork(end.getQueue(), abstractSession, workName, end);
                    AbstractTcpProtocolWork<? extends EndpointData> oldWork = tcpProtocolWorkMap.put(end.getAddress(), endpointDataHandlerWork);
                    if (oldWork != null) {
                        //如果之前已经存在则直接销毁
                        log.warn("删除之前的设备端点线程 sessionId {} address {}", sessionId, end.getAddress());
                        oldWork.stop();
                    }
                    //提交线程
                    threadPoolExecutor.execute(endpointDataHandlerWork);
                });
            }
        } else if (netDeviceEvent instanceof CloseNetDeviceEvent) {
            //设备关闭 则需要关闭对应的端点线程
            Map<Short, AbstractTcpProtocolWork<EndpointData>> workExecutorMap = this.workExecutorMap.get(sessionId);
            if (CollUtil.isNotEmpty(workExecutorMap)) {
                //关闭端点线程
                workExecutorMap.values().forEach(AbstractTcpProtocolWork::stop);
            }
        }
    }

    /**
     * 端点数据工作处理者
     */
    private class DeviceEndpointDataHandlerWork extends AbstractTcpProtocolWork<EndpointData> {

        /**
         * 端点对象
         */
        private final AbstractEndpoint abstractEndpoint;

        /**
         * 端点数据缓存fifo
         * 用于保存近期的数据
         */
        private final BoundedPriorityQueue<EndpointData> endpointDataFifo = new BoundedPriorityQueue<>(5, new Comparator<EndpointData>() {
            @Override
            public int compare(EndpointData o1, EndpointData o2) {
                return Long.compare(o2.getTimestamp(), o1.getTimestamp());
            }
        });

        /**
         * 构造 默认不执行
         *
         * @param blockingQueue   堵塞队列
         * @param abstractSession session
         * @param workName        工作线程名字，用于打印
         */
        protected DeviceEndpointDataHandlerWork(BlockingQueue<EndpointData> blockingQueue, AbstractSession abstractSession, String workName, AbstractEndpoint abstractEndpoint) {
            super(blockingQueue, abstractSession, workName);
            this.abstractEndpoint = abstractEndpoint;
        }

        /**
         * 得到最近的缓存数据
         *
         * @return
         */
        public List<EndpointData> getEndpointDataFifoList() {
            return endpointDataFifo.toList();
        }

        /**
         * 收到堵塞队列的数据后的处理
         *
         * @param endpointData    队列中的元素
         * @param abstractSession 当前处理的会话
         * @throws Exception
         */
        @Override
        public void doHandlerWork(EndpointData endpointData, AbstractSession abstractSession) throws Exception {
            log.info("收到端点数据 sessionId {} data {}", abstractSession.getSessionId(), endpointData);
//            deviceEndpointHandler.endpointHandler(abstractSession, abstractEndpoint, endpointData);
            if (abstractEndpoint.isOut()) {
                AbstractOutputNetTcpMessage outputNetTcpMessage = endpointDataSerialize.deserialize(abstractSession, abstractEndpoint, endpointData);
                if (outputNetTcpMessage != null) {
                    //写入到队列中，后面会有线程消费并发送
                    abstractSession.addSendNetTcpMessage(outputNetTcpMessage);
                }
            } else {
                endpointDataSerialize.serialize(abstractSession, abstractEndpoint, endpointData);
                //加入到缓存中
                endpointDataFifo.offer(endpointData);
                log.info("收到端点数据 sessionId {} data {} ", abstractSession.getSessionId(), endpointData);
            }
        }
    }
}
